-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Fix compatibiliy issue between 1.12.x and 1.13.x #3354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| org.apache.avro:avro = 1.10.1 | ||
| org.apache.calcite:* = 1.10.0 | ||
| org.apache.flink:* = 1.13.2 | ||
| org.apache.flink:* = 1.12.5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the comment from @pnowojski says (The comment from this email)
We are trying to provide forward compatibility: applications using
@PublicAPIs
compiled against Flink 1.12.x, should work fine in Flink 1.13.x. We do not
guarantee it the other way: applications compiled against Flink 1.13.x are
not intended to work with Flink 1.12.x.
So we'd better to use flink 1.12.x to build the iceberg-flink-runtime jar to work against flink 1.13.x runtime. Currently, the latest version is 1.12.5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx @pnowojski there is a breaking change for SplitEnumerator in 1.13.
https://issues.apache.org/jira/browse/FLINK-22133
This revert to 1.12 breaks the compiling of FLIP-27 Iceberg source dev branch, which has been updated to the 1.13 SplitEnumerator API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu , Currently, the 1.12.5 is used to compile the flink common module (Let's say iceberg-flink) after we got this PR merged: #3364. Although we apache flink introduced this breaking changes, we could still make it work for both flink 1.12.x and flink 1.13.x by separating the difference classes into different modules (Let's say iceberg-flink:iceberg-flink-1.12 and iceberg-flink:iceberg-flink-1.13.
| // ResolvedCatalogTable class into the iceberg-flink-runtime jar for compatibility purpose. | ||
| private static final DynMethods.UnboundMethod GET_CATALOG_TABLE = DynMethods.builder("getCatalogTable") | ||
| .impl(Context.class, "getCatalogTable") | ||
| .build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this fail if there is no getCatalogTable method? And if the method exists then it wouldn't need to be called dynamically. You may need a orNoop() call here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And if the method exists then it wouldn't need to be called dynamically
The reason why I use the dynamic approach is to avoid add the flink 1.13's ResolvedCatalogTable into the runtime jar. For example, if we decode the FlinkDynmaicTableFactory.class from iceberg-flink-runtime.jar (which is compiled by flink 1.13.2) by using javap -c ./org/apache/iceberg/flink/FlinkDynamicTableFactory.class , it will has the following JVM instructions:
Compiled from "FlinkDynamicTableFactory.java"
public class org.apache.iceberg.flink.FlinkDynamicTableFactory implements org.apache.flink.table.factories.DynamicTableSinkFactory,org.apache.flink.table.factories.DynamicTableSourceFactory {
static final java.lang.String FACTORY_IDENTIFIER;
public org.apache.iceberg.flink.FlinkDynamicTableFactory();
Code:
0: aload_0
1: invokespecial #43 // Method java/lang/Object."<init>":()V
4: aload_0
5: aconst_null
6: putfield #45 // Field catalog:Lorg/apache/iceberg/flink/FlinkCatalog;
9: return
public org.apache.iceberg.flink.FlinkDynamicTableFactory(org.apache.iceberg.flink.FlinkCatalog);
Code:
0: aload_0
1: invokespecial #43 // Method java/lang/Object."<init>":()V
4: aload_0
5: aload_1
6: putfield #45 // Field catalog:Lorg/apache/iceberg/flink/FlinkCatalog;
9: return
public org.apache.flink.table.connector.source.DynamicTableSource createDynamicTableSource(org.apache.flink.table.factories.DynamicTableFactory$Context);
Code:
0: aload_1
1: invokeinterface #54, 1 // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getObjectIdentifier:()Lorg/apache/flink/table/catalog/ObjectIdentifier;
6: astore_2
7: aload_1
8: invokeinterface #58, 1 // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getCatalogTable:()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;
13: invokevirtual #64 // Method org/apache/flink/table/catalog/ResolvedCatalogTable.getOptions:()Ljava/util/Map;
16: astore_3
17: aload_1
18: invokeinterface #58, 1 // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getCatalogTable:()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;
23: astore 4
25: aload_1
26: invokeinterface #58, 1 // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getCatalogTable:()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;
31: invokevirtual #68 // Method org/apache/flink/table/catalog/ResolvedCatalogTable.getSchema:()Lorg/apache/flink/table/api/TableSchema;
34: invokestatic #74 // Method org/apache/flink/table/utils/TableSchemaUtils.getPhysicalSchema:(Lorg/apache/flink/table/api/TableSchema;)Lorg/apache/flink/table/api/TableSchema;
37: astore 5
39: aload_0
40: getfield #45 // Field catalog:Lorg/apache/iceberg/flink/FlinkCatalog;
43: ifnull 62
46: aload_0
47: getfield #45 // Field catalog:Lorg/apache/iceberg/flink/FlinkCatalog;
50: aload_2
51: invokevirtual #80 // Method org/apache/flink/table/catalog/ObjectIdentifier.toObjectPath:()Lorg/apache/flink/table/catalog/ObjectPath;
54: invokestatic #84 // Method createTableLoader:(Lorg/apache/iceberg/flink/FlinkCatalog;Lorg/apache/flink/table/catalog/ObjectPath;)Lorg/apache/iceberg/flink/TableLoader;
57: astore 6
59: goto 78
62: aload 4
64: aload_3
65: aload_2
66: invokevirtual #94 // Method org/apache/flink/table/catalog/ObjectIdentifier.getDatabaseName:()Ljava/lang/String;
In this line :
8: invokeinterface #58, 1 // InterfaceMethod org/apache/flink/table/factories/DynamicTableFactory$Context.getCatalogTable:()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;
It will add the ResolvedCatalogTable explicitly into the iceberg-flink-runtime.jar, that's not what we expected. Because we expect the iceberg-flink-runtime jar could run perfectly in both flink 1.12&flink1.13 clusters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may need a orNoop() call here.
This looks good to me for handling the extreme case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so the method exists in both cases, but returns a more specific object in Flink 1.13?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct.
|
@openinx, what do you think about taking an approach similar to what we've done in Spark and building a module for each supported Flink version? That would help us add support for multiple versions of Scala as well. I don't think that should block this PR, but I'd like to have a good plan for supporting multiple Flink versions in parallel without having issues that are reported by users. |
That's exactly what I'm planning to push forward in the next step. Except to separate multiple flink versions in different module (if necessary), other things are required to address:
|
|
@rdblue , any other concern for this PR ? |
This PR is trying to address the #3187 by using the approach from this comment: #3187 (comment)
In this way, the iceberg-flink-runtime jar compiled with flink 1.12.x (or flink 1.13.x) could work fine with flink 1.13.x (or flink 1.12.x). I verified this feature by hand under my localhost, everything works fine.